Многопоточное программирование в Java - Тимур Машнин
Этот метод также неоднократно вызывает методы интерфейса ManagedBlocker isReleasable и block, пока оба метода не вернут true.
Каждому вызову метода block предшествует вызов метода isReleasable, который возвращает false.
При блокировке, будет создан новый поток внутри пула, чтобы поддерживать параллелизм пока работает блокировка.
Как только блок с управляемым блокированием завершается, уровень параллелизма становится слишком высоким, поэтому, когда другой поток завершает выполнение своей задачи, вместо того чтобы ждать следующую задачу из очереди задач, он отбрасывается, тем самым возвращая параллелизм обратно к своему первоначальному уровню.
В этом примере мы блокируем рабочий поток вызовом метода Thread.sleep.
Для динамического расширения пула потоков мы можем обернуть этот вызов в ManagedBlocker.
Для этого мы создадим класс Sleeper, расширяющий интерфейс ManagedBlocker.
Здесь при вызове метода managedBlock, будет вызван метод isReleasable, он вернет false.
Поэтому будет вызван метод block и текущий поток заснет.
При этом в пуле будет создан новый поток.
Как только текущий поток проснется, метод isReleasable начнет возвращать true, это просигнализирует пулу потоков, что один поток можно убрать.
Для такой блокировки, мы в нашей задаче вызываем метод managedBlock, в который передаем экземпляр созданного класса с нужными параметрами.
Потокобезопасные коллекции
Фреймворк Collection, с его интерфейсами List, Set, и Map, обеспечивает потоковую безопасность с помощью синхронизированных оболочек.
Где используются синхронизированные блоки кода.
Однако эти потокобезопасные коллекции все еще имеют некоторые недостатки.
Например, необходимо блокировать коллекцию при ее итерации, иначе вы рискуете получить исключение ConcurrentModificationException.
Эти коллекции используют итераторы, предполагающие, что коллекция не изменит свое содержимое за время, когда поток выполняет итерацию через содержимое.
Если итератор такой коллекции обнаруживает, что во время итерации была внесена модификация, он выдает исключение ConcurrentModificationException.
Кроме того, если доступ к коллекции осуществляется из нескольких потоков, возникают проблемы с производительностью.
Поэтому появились новые классы коллекций в пакете java.util.concurrent.
Требование о том, что коллекция не изменяется во время итерации, неудобно для параллельных приложений.
Вместо этого предпочтительно разрешить одновременную модификацию.
Итераторы коллекций java.util.concurrent называются слабо согласованными итераторами.
Для этих классов, если элемент был удален с начала итерации и еще не возвращен методом next, он не будет возвращен вызывающему.
Если элемент был добавлен с начала итерации, он может быть возвращен или не возвращен вызывающему.
И ни один элемент не будет возвращен дважды в одной итерации, независимо от того, как изменилась базовая коллекция.
Это достигается с помощью создания новой копии коллекции каждый раз, когда элемент добавляется или удаляется, но выполняемая итерация продолжает работать над копией, которая была актуальна во время создания итератора.
Что касается хэш-карты, синхронизированная хэш-карта обеспечивает потоковую безопасность, синхронизируя каждый метод.
Это означает, что, когда один поток выполняет один из методов карты, другие потоки должны ждать до тех пор, пока первый поток не будет завершен, независимо от того, что они хотят делать с картой.
Напротив, класс пакета java.util.concurrent ConcurrentHashMap позволяет множественным операциям чтения-записи выполняться параллельно.
ConcurrentHashMap невозможно заблокировать для исключительного использования, она предназначена для одновременного доступа.
Итераторы, возвращенные ConcurrentHashMap, также слабо согласованы, что означает, что они не будут бросать ConcurrentModificationException.
Что касается очередей, очереди могут быть ограниченными или неограниченными.
При одновременном доступе к ограниченной очереди может возникнуть ошибка добавления элемента в уже заполненную очередь или ошибка удаления элемента из уже пустой очереди.
Поэтому желательно, чтобы поток блокировал очередь при операциях с ней.
В случае неограниченной очереди, при одновременном доступе, если будет дисбаланс между производителем и потребителем, когда удаление элементов будет медленнее, чем добавление элементов, система может исчерпать память, так как длина очереди будет расти без ограничений.
Блокирование неограниченной очереди позволит контролировать поток и ограничить производителей.
Исходя из всего этого, ограниченная, блокирующая очередь позволяет ограничить ресурсы, используемые данной очереди естественным способом.
Поэтому в параллельных вычислениях используется очередь BlockingQueue и ее производные классы LinkedBlockingQueue, PriorityBlockingQueue, ArrayBlockingQueue, SynchronousQueue и другие.
Синхронизаторы
Мы уже рассматривали низкоуровневую синхронизацию потоков с помощью мониторов.
Классы синхронизаторы представляют высокоуровневый программный интерфейс для регулирования работы потоков.
Семафоры используются для ограничения количества потоков для доступа к некоторому ресурсу.
В этом примере мы создаем задачу и 10 потоков, которые обрабатывают эту задачу.
Но так как в задаче мы создали семафор с 3 разрешениями, только три потока одновременно могу обрабатывать эту задачу, остальные потоки ждут, пока какой-нибудь из выполняющихся потоков не освободит семафор.
CountDownLatch (замок с обратным отсчетом) заставляет одни потоки ждать до тех пор, пока другие потоки не выполнят определенное количество операций и не уменьшат счетчик CountDownLatch до нуля.
Блокировка потоков снимается с помощью счётчика, любой действующий поток, при выполнении определенной операции уменьшает значение счётчика.
Когда счётчик достигает 0, все ожидающие потоки разблокируются и продолжают выполняться.
В этом примере у нас есть счетчик со значением три.
И есть два потока.
В одном потоке мы вызываем метод await счетчика и этот поток начинает ждать, пока счетчик не уменьшится до нуля.
В другом потоке мы выполняем три операции, каждый раз уменьшая значение счетчика на единицу с помощью метода countDown.
Как только счетчик обнуляется, другой поток просыпается.
CyclicBarrier позволяет собрать потоки у барьера, выполнить дополнительное действие, освободить потоки, затем собрать потоки у другого барьера, выполнить дополнительное действие, освободить потоки и так далее.
При создании барьера вы указываете, сколько потоков будут участвовать в этой синхронизации, и объект Runnable, представляющий дополнительное действие, которое будет выполнено, когда все потоки соберутся у барьера.
В этом примере мы создаем два барьера и два потока.
Два потока стартуют и вызывают метод await для первого барьера.
Как только последний поток вызывает метод await, он прибывает к барьеру, выполняется действие барьера